Skip to content

第14章 RAG架构与原理:基于LangChain构建对话知识库

学习目标

  • 理解检索增强生成(RAG)的核心概念与架构
  • 掌握使用LangChain框架实现RAG系统的方法
  • 学习处理多种文档格式(PDF、Word等)并构建向量数据库
  • 实现一个完整的对话知识库系统

什么是检索增强生成(RAG)?

检索增强生成(RAG)是一种结合检索系统与生成式AI的架构,通过从外部知识库检索相关信息来增强大语言模型的响应能力,特别适合构建专业领域的对话知识库系统。

RAG的核心流程

  1. 文档处理:加载并处理多种格式文档
  2. 文档分割:将文档切分为适当大小的片段
  3. 向量化:将文本片段转换为向量表示
  4. 知识检索:根据用户查询检索相关文档
  5. 上下文增强:将检索内容与用户查询结合
  6. 生成回答:利用增强上下文生成最终回答

使用LangChain实现RAG系统

LangChain提供了构建RAG系统所需的全套组件和工具链,使实现过程变得简单高效。

1. 文档加载与处理

LangChain提供多种文档加载器,支持各类文件格式:

python
from langchain.document_loaders import PyPDFLoader, TextLoader, Docx2txtLoader, CSVLoader

# 加载PDF文档
pdf_loader = PyPDFLoader("data/document.pdf")
pdf_docs = pdf_loader.load()

# 加载Word文档
docx_loader = Docx2txtLoader("data/document.docx")
docx_docs = docx_loader.load()

# 加载文本文件
text_loader = TextLoader("data/document.txt")
text_docs = text_loader.load()

# 加载CSV数据
csv_loader = CSVLoader("data/data.csv")
csv_docs = csv_loader.load()

# 合并所有文档
all_docs = pdf_docs + docx_docs + text_docs + csv_docs

2. 文档分割策略

将长文档切分为适合向量化的片段:

python
from langchain.text_splitter import RecursiveCharacterTextSplitter, CharacterTextSplitter

# 递归文本分割(推荐)
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,       # 每个块的字符数
    chunk_overlap=200,     # 相邻块之间的重叠字符数
    separators=["\n\n", "\n", " ", ""],  # 优先按段落、句子分割
    length_function=len,
)

# 分割文档
chunks = text_splitter.split_documents(all_docs)

3. 文本向量化

使用本地嵌入模型将文本块转换为向量表示:

python
from langchain.embeddings import HuggingFaceEmbeddings

# 使用本地BGE嵌入模型(适合中文场景)
embeddings = HuggingFaceEmbeddings(
    model_name="./bge-large-zh-v1.5",  # 指向本地模型目录
    model_kwargs={'device': 'cpu'},    # 使用CPU运行,对于GPU可设置为'cuda'
    encode_kwargs={'normalize_embeddings': True}  # 归一化嵌入向量以提高检索质量
)

# 或使用本地GTE嵌入模型(适合多语言场景)
# embeddings = HuggingFaceEmbeddings(
#     model_name="./gte-large",  # 指向本地模型目录
#     model_kwargs={'device': 'cpu'},
#     encode_kwargs={'normalize_embeddings': True}
# )

4. 向量数据库

使用Chroma向量数据库存储文档向量:

python
from langchain_chroma import Chroma

# 使用Chroma向量数据库
vectorstore = Chroma.from_documents(
    documents=chunks,
    embedding_function=embeddings,  # 注意参数名为embedding_function
    persist_directory="chroma_db"  # 持久化存储目录
)

# 持久化保存数据库
vectorstore.persist()

# 后续可以重新加载已有数据库
vectorstore = Chroma(
    persist_directory="chroma_db",
    embedding_function=embeddings
)

5. 检索查询器

构建检索查询器,支持多种检索策略:

python
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
from langchain_openai import OpenAI

# 基本检索(相似度搜索)
retriever = vectorstore.as_retriever(
    search_type="similarity",    # 支持 similarity, mmr
    search_kwargs={"k": 5}       # 返回前k个结果
)

# 高级检索(内容压缩)- 提取最相关的内容片段
llm = OpenAI(temperature=0)
compressor = LLMChainExtractor.from_llm(llm)
compression_retriever = ContextualCompressionRetriever(
    base_retriever=retriever,
    doc_compressor=compressor
)

6. 构建RAG链

将上述组件整合成一个完整的RAG系统:

python
from langchain.chains import RetrievalQA, ConversationalRetrievalChain
from langchain.memory import ConversationBufferMemory
from langchain_openai import ChatOpenAI

# 初始化LLM
llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0)

# 简单问答链
qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",  # 支持stuff, map_reduce, refine, map_rerank
    retriever=retriever,
    return_source_documents=True  # 返回来源文档
)

# 对话记忆
memory = ConversationBufferMemory(
    memory_key="chat_history",
    return_messages=True
)

# 对话型检索链
conversation_chain = ConversationalRetrievalChain.from_llm(
    llm=llm,
    retriever=retriever,
    memory=memory,
    return_source_documents=True
)

7. 构建简单交互界面

使用Streamlit快速构建对话知识库界面:

python
import streamlit as st

st.title("智能文档问答系统")

# 初始化会话状态
if "messages" not in st.session_state:
    st.session_state.messages = []

# 显示对话历史
for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        st.markdown(message["content"])

# 接收用户输入
if prompt := st.chat_input("请输入您的问题"):
    st.session_state.messages.append({"role": "user", "content": prompt})
    with st.chat_message("user"):
        st.markdown(prompt)
    
    # 生成回答
    with st.chat_message("assistant"):
        with st.spinner("思考中..."):
            result = conversation_chain({"question": prompt})
            response = result["answer"]
            sources = result["source_documents"]
            
            # 显示回答
            st.markdown(response)
            
            # 显示来源文档
            with st.expander("查看来源"):
                for i, doc in enumerate(sources):
                    st.markdown(f"**来源 {i+1}**")
                    st.markdown(doc.page_content)
                    st.markdown(f"*来自: {doc.metadata.get('source', '未知来源')}*")
    
    st.session_state.messages.append({"role": "assistant", "content": response})

完整RAG系统实现示例

下面是一个完整的RAG系统实现,包括文档处理、向量存储和对话接口:

python
import os
from langchain.document_loaders import PyPDFLoader, Docx2txtLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain_chroma import Chroma
from langchain_openai import ChatOpenAI
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationalRetrievalChain
from langchain.prompts import PromptTemplate

# 配置模型路径和API密钥
os.environ["OPENAI_API_KEY"] = "your-openai-api-key"

class DocumentAssistant:
    def __init__(self, docs_dir="./documents", model_dir="./bge-large-zh-v1.5"):
        self.docs_dir = docs_dir
        self.model_dir = model_dir
        self.documents = []
        self.chunks = []
        self.vectorstore = None
        self.conversation_chain = None
        
    def load_documents(self):
        """加载多种格式的文档"""
        for file in os.listdir(self.docs_dir):
            file_path = os.path.join(self.docs_dir, file)
            
            if file.endswith('.pdf'):
                loader = PyPDFLoader(file_path)
                self.documents.extend(loader.load())
            elif file.endswith('.docx'):
                loader = Docx2txtLoader(file_path)
                self.documents.extend(loader.load())
            elif file.endswith('.txt'):
                loader = TextLoader(file_path)
                self.documents.extend(loader.load())
                
        print(f"已加载 {len(self.documents)} 个文档")
        
    def process_documents(self):
        """处理文档:分割和向量化"""
        # 文档分割
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
            separators=["\n\n", "\n", "。", "!", "?", ".", " ", ""],
        )
        self.chunks = text_splitter.split_documents(self.documents)
        print(f"文档已分割为 {len(self.chunks)} 个块")
        
        # 创建向量存储
        embeddings = HuggingFaceEmbeddings(
            model_name=self.model_dir,
            model_kwargs={'device': 'cpu'},
            encode_kwargs={'normalize_embeddings': True}
        )
        
        self.vectorstore = Chroma.from_documents(
            documents=self.chunks, 
            embedding_function=embeddings,
            persist_directory="chroma_document_db"
        )
        
        # 保存向量库
        self.vectorstore.persist()
        print("向量存储已创建并保存")
        
    def load_vectorstore(self):
        """加载已有向量库"""
        if os.path.exists("chroma_document_db"):
            embeddings = HuggingFaceEmbeddings(
                model_name=self.model_dir,
                model_kwargs={'device': 'cpu'},
                encode_kwargs={'normalize_embeddings': True}
            )
            
            self.vectorstore = Chroma(
                persist_directory="chroma_document_db",
                embedding_function=embeddings
            )
            print("已加载向量存储")
            return True
        return False
        
    def setup_qa_chain(self):
        """设置问答链"""
        # 自定义提示模板
        template = """
        你是一个专业的知识助手,使用提供的上下文信息来回答用户的问题。
        如果你不知道答案,就说你不知道,不要试图编造答案。
        尽量使用中文回答问题,除非用户使用其他语言提问。
        使用上下文中的信息来提供准确、有帮助的回答。
        
        上下文信息:
        {context}
        
        对话历史:
        {chat_history}
        
        用户问题: {question}
        """
        
        QA_PROMPT = PromptTemplate(
            input_variables=["context", "chat_history", "question"],
            template=template
        )
        
        # 设置对话记忆
        memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True)
        
        # 创建对话链
        self.conversation_chain = ConversationalRetrievalChain.from_llm(
            llm=ChatOpenAI(temperature=0, model_name="gpt-3.5-turbo"),
            retriever=self.vectorstore.as_retriever(search_kwargs={"k": 4}),
            memory=memory,
            combine_docs_chain_kwargs={"prompt": QA_PROMPT},
            return_source_documents=True
        )
        print("问答链已设置完成")
        
    def ask(self, question):
        """向系统提问"""
        if not self.conversation_chain:
            return "系统未初始化,请先加载文档"
        
        result = self.conversation_chain({"question": question})
        return {
            "answer": result["answer"],
            "sources": [{"content": doc.page_content, "source": doc.metadata.get("source", "未知")} 
                        for doc in result.get("source_documents", [])]
        }

# 使用示例
if __name__ == "__main__":
    # 可以指定本地模型路径,默认使用 ./bge-large-zh-v1.5
    assistant = DocumentAssistant(model_dir="./bge-large-zh-v1.5") 
    # 如果要使用GTE模型可以替换为 model_dir="./gte-large"
    
    # 如果向量库已存在,直接加载
    if not assistant.load_vectorstore():
        # 否则重新处理文档
        assistant.load_documents()
        assistant.process_documents()
    
    # 设置问答链
    assistant.setup_qa_chain()
    
    # 交互式问答
    while True:
        question = input("\n请输入问题 (输入'退出'结束): ")
        if question.lower() in ['退出', 'exit', 'quit']:
            break
            
        response = assistant.ask(question)
        print("\n回答:", response["answer"])
        
        if len(response["sources"]) > 0:
            print("\n来源文档:")
            for i, source in enumerate(response["sources"]):
                print(f"[{i+1}] {source['source']}: {source['content'][:150]}...")

RAG系统优化策略

1. 文档处理优化

  • 智能分割:根据文档结构(标题、段落)进行语义分割
  • 元数据丰富:为每个文档块添加来源、时间等元数据
  • 内容过滤:去除无意义内容,如页眉页脚、广告等
python
# 添加元数据的例子
for i, doc in enumerate(documents):
    doc.metadata["source"] = f"document_{i}.pdf"
    doc.metadata["date"] = "2023-10-15"  # 文档日期
    doc.metadata["category"] = "finance"  # 文档类别

2. 检索优化

  • 混合检索:结合关键词检索和向量检索
  • 查询扩展:通过LLM扩展原始查询,提高召回率
  • 结果重排序:根据相关性对检索结果进行重新排序
python
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import EmbeddingsFilter

# 语义过滤器 - 提高精确性
embeddings_filter = EmbeddingsFilter(
    embeddings=embeddings,
    similarity_threshold=0.76  # 只保留相似度超过阈值的文档
)

# 压缩检索器
compression_retriever = ContextualCompressionRetriever(
    base_retriever=vectorstore.as_retriever(search_kwargs={"k": 10}),
    doc_compressor=embeddings_filter
)

3. 生成优化

  • 结构化输出:使用输出解析器获取结构化回答
  • 引用增强:在回答中添加引用标记,指向来源文档
  • 一致性检查:确保生成内容与检索文档一致
python
from langchain.output_parsers import StructuredOutputParser, ResponseSchema

# 定义输出格式
response_schemas = [
    ResponseSchema(name="answer", description="对问题的回答"),
    ResponseSchema(name="sources", description="支持回答的信息来源"),
    ResponseSchema(name="confidence", description="对回答确信度的估计(0-100)")
]

output_parser = StructuredOutputParser.from_response_schemas(response_schemas)

# 在提示中使用格式说明
format_instructions = output_parser.get_format_instructions()
prompt_template = """
根据以下上下文回答问题,并按照指定格式输出:
{format_instructions}

上下文: {context}
问题: {question}
"""

常见RAG应用场景与实现思路

1. 企业内部知识库

连接公司文档、Wiki、产品手册等,构建内部智能助手:

python
# 企业知识库专用提示模板
enterprise_template = """
你是[公司名]的内部知识助手。你的回答必须:
1. 只使用提供的上下文信息回答问题
2. 引用确切的公司政策和规程
3. 如果不确定,建议用户联系相关部门
4. 保持专业、简洁的语气

上下文: {context}
问题: {question}
"""

2. 个性化学习助手

结合课程材料、教科书等,创建个性化学习体验:

python
# 学习助手专用回答生成函数
def generate_learning_answer(question, contexts, llm):
    # 1. 直接回答问题
    answer = llm.predict(f"基于以下资料回答问题: {contexts} 问题: {question}")
    
    # 2. 生成相关练习题
    practice_questions = llm.predict(
        f"基于这个问题和上下文,生成3个相关的练习题: {question} {contexts[:500]}"
    )
    
    # 3. 提供进一步学习资源
    additional_resources = llm.predict(
        f"推荐进一步学习的主题或资源,与此问题相关: {question}"
    )
    
    return {
        "answer": answer,
        "practice_questions": practice_questions,
        "additional_resources": additional_resources
    }

3. 技术文档助手

支持API文档、框架说明等技术内容查询:

python
# 代码示例生成
def generate_code_example(docs, question, language="python"):
    # 找到相关API文档
    relevant_docs = retrieve_relevant_docs(docs, question)
    
    # 基于文档生成代码示例
    prompt = f"""
    根据以下API文档,使用{language}语言生成一个简洁的代码示例:
    API文档: {relevant_docs}
    需求: {question}
    
    请仅返回代码,附带简短注释说明。
    """
    
    return llm.predict(prompt)

思考题

  1. 在处理大型文档集时,如何平衡向量数据库的性能与检索质量?

  2. 对于专业领域的RAG系统,如何选择合适的文档分割策略和块大小?

  3. 在使用LangChain实现RAG系统时,如何有效地处理多语言文档?

  4. 设计一个能够处理表格、图片等多模态信息的RAG系统,需要考虑哪些额外因素?

  5. 如何评估和优化RAG系统的检索性能和生成质量?

接下来,我们将深入探讨如何构建和部署一个完整的RAG系统,并进行性能优化和评估。